Skip to main content

API Integration ExecModules

The ValkyrAI Workflow Engine provides powerful API integration capabilities through specialized ExecModules. These modules enable seamless connectivity with external services using modern protocols and standards with enterprise-grade security and reliability.

Available API Integration Modules

REST API Client (api.rest.generic)

Generic REST API client with OpenAPI specification support for automatic request construction.

Configuration Schema:

{
"operationId": "getPetById",
"params": {
"petId": "12345",
"includeDetails": "true"
},
"body": {
"name": "Fluffy",
"status": "available"
},
"headers": {
"Content-Type": "application/json",
"Accept": "application/json"
},
"mapping_profile": "petstore_v1"
}

OpenAPI Integration:

  • Automatically resolves endpoints from ExecModule.specs[0]
  • Validates request/response schemas
  • Generates proper HTTP methods and paths
  • Supports parameter serialization

Integration Account Requirements:

  • apiKey: API key or Bearer token
  • password: Additional secret if required

WorkflowState Keys Emitted:

  • api.rest.status: HTTP response status code
  • api.rest.body: Response body (truncated if large)
  • api.rest.headers: Response headers (truncated)

GraphQL Client (api.graphql.generic)

Execute GraphQL queries and mutations with variable support and error handling.

Configuration Schema:

{
"endpoint": "https://api.github.com/graphql",
"query": "query GetUser($login: String!) { user(login: $login) { name email bio } }",
"variables": {
"login": "octocat"
},
"operationName": "GetUser"
}

Features:

  • Full GraphQL specification support
  • Query validation and optimization
  • Automatic error extraction and handling
  • Response data normalization

Integration Account Requirements:

  • apiKey: GraphQL API token or Bearer token
  • password: (Optional) Additional authentication

WorkflowState Keys Emitted:

  • api.gql.data: GraphQL response data
  • api.gql.errors: GraphQL errors array

gRPC Client (api.grpc.generic)

Generic gRPC client with support for unary and server-streaming calls.

Configuration Schema:

{
"target": "api.service.com:443",
"fullMethodName": "greeter.v1.GreeterService/SayHello",
"message": {
"name": "World",
"language": "en"
},
"metadata": {
"authorization": "Bearer token123",
"x-request-id": "req_12345"
}
}

Features:

  • Protocol buffer serialization/deserialization
  • Server streaming support with EventLog per message
  • Automatic retry with exponential backoff
  • Connection pooling and load balancing

Integration Account Requirements:

  • apiKey: gRPC authentication token
  • password: (Optional) Additional metadata

WorkflowState Keys Emitted:

  • api.grpc.messages: JSON array of received messages (size-capped)

WebSocket Client (api.ws.client)

Real-time WebSocket communication with connection lifecycle management.

Configuration Schema:

{
"url": "wss://api.example.com/realtime",
"initial_message": {
"type": "subscribe",
"channels": ["orders", "notifications"]
},
"ping_interval_ms": 30000,
"close_after_ms": 300000
}

Features:

  • Automatic connection management
  • Ping/pong heartbeat support
  • Message frame logging with payload hashing
  • Graceful disconnection handling

Integration Account Requirements:

  • apiKey: WebSocket authentication token
  • password: (Optional) Additional credentials

WorkflowState Keys Emitted:

  • api.ws.last_frame_hash: Hash of last received frame
  • api.ws.frames_count: Total frames received

Webhook Receiver (api.webhook.receiver)

Secure webhook endpoint with signature verification and payload processing.

Configuration Schema:

{
"path": "/_hooks/stripe",
"signing_secret": "whsec_1234567890abcdef",
"replay_target_module": "payment_processor"
}

Security Features:

  • HMAC-SHA256 signature verification
  • Payload deduplication
  • Request rate limiting
  • IP whitelist support (configurable)

Integration Account Requirements:

  • apiKey: Webhook signing secret
  • password: (Optional) Additional verification

WorkflowState Keys Emitted:

  • webhook.last_event_id: Last processed event ID
  • webhook.last_result: Processing result summary

Advanced Features

OpenAPI Integration

The REST module provides sophisticated OpenAPI integration:

// Automatic endpoint resolution from spec
ExecModule restModule = new ExecModule()
.moduleType("api.rest.generic")
.specs(List.of(openApiSpec)) // Automatically loaded
.moduleData("""
{
"operationId": "createUser",
"body": {"name": "John", "email": "john@example.com"}
}
""");

Server Streaming Support

gRPC and WebSocket modules support real-time data streams:

// Monitor server-streaming responses
workflow.getEventLog()
.filter(event -> event.getEventDetails().contains("Received"))
.subscribe(event -> {
log.info("New message: {}", event.getEventDetails());
});

Response Processing Pipeline

All modules include comprehensive response processing:

  1. Validation: Schema validation against OpenAPI/GraphQL specs
  2. Transformation: Data format conversion and normalization
  3. Truncation: Large response handling with size limits
  4. Redaction: Automatic PII and sensitive data removal
  5. Caching: Response caching with configurable TTL

Security Features

Authentication Support

Multiple authentication mechanisms supported across protocols:

REST/GraphQL:

  • Bearer tokens
  • API keys (header/query parameter)
  • Basic authentication
  • OAuth 2.0 flows
  • Custom header schemes

gRPC:

  • TLS client certificates
  • JWT tokens in metadata
  • Custom authentication headers

WebSocket:

  • Token-based authentication
  • Connection upgrade headers
  • Subprotocol negotiation

Data Protection

  • Request/Response Logging: Sensitive data automatically redacted
  • Credential Security: Never log API keys or tokens
  • TLS/SSL: Enforce encrypted connections
  • Rate Limiting: Prevent abuse and respect API quotas

RBAC Controls

All API modules enforce role-based access:

  • ADMIN: Full access to all API operations
  • USER: Restricted access with approval workflows
  • ANONYMOUS: Blocked from sensitive operations

Error Handling & Resilience

HTTP Status Code Handling

REST module provides detailed HTTP error handling:

// Automatic retry for transient errors
if (status >= 500) {
return RetryDecision.RETRY_WITH_BACKOFF;
} else if (status == 429) {
return RetryDecision.RETRY_AFTER_DELAY;
} else if (status >= 400) {
return RetryDecision.FAIL_IMMEDIATELY;
}

Circuit Breaker Pattern

Automatic circuit breaking for failing services:

  • Closed: Normal operation, requests flow through
  • Open: Service failing, requests rejected immediately
  • Half-Open: Testing service recovery

Timeout Management

Configurable timeouts at multiple levels:

  • Connection Timeout: Time to establish connection
  • Request Timeout: Time to complete request/response
  • Stream Timeout: Time for streaming operations

Usage Examples

Multi-Service API Orchestration

// Orchestrate multiple API calls
Workflow apiOrchestration = new Workflow()
.addModule(new ExecModule()
.moduleType("api.rest.generic")
.moduleData("""
{
"operationId": "getUserProfile",
"params": {"userId": "12345"}
}
"""))
.addModule(new ExecModule()
.moduleType("api.graphql.generic")
.moduleData("""
{
"endpoint": "https://api.github.com/graphql",
"query": "query { viewer { login name } }"
}
"""))
.addModule(new ExecModule()
.moduleType("api.grpc.generic")
.moduleData("""
{
"target": "recommendations.api.com:443",
"fullMethodName": "recommendations.v1.Service/GetRecommendations",
"message": {"userId": "12345", "limit": 10}
}
"""));

Real-time Data Processing

// WebSocket + Webhook integration
Workflow realtimeProcessing = new Workflow()
.addModule(webhookReceiver) // Receive external events
.addModule(websocketClient) // Stream to real-time clients
.addModule(restNotification); // Send notifications via REST

API Gateway Pattern

// Use as API gateway with routing
ExecModule apiGateway = new ExecModule()
.moduleType("api.rest.generic")
.moduleData("""
{
"operationId": "routeRequest",
"headers": {"X-Route-Target": "{{workflow.route_target}}"},
"body": "{{workflow.request_body}}"
}
""");

Monitoring & Observability

Request/Response Tracing

All API calls include comprehensive tracing:

// Extract API metrics from WorkflowState
Map<String, Object> apiMetrics = workflow.getWorkflowState()
.stream()
.filter(state -> state.getKey().startsWith("api."))
.collect(Collectors.toMap(
WorkflowState::getKey,
WorkflowState::getValue
));

Performance Monitoring

Track API performance across all protocols:

  • Latency: Request/response timing
  • Throughput: Requests per second
  • Error Rates: 4xx/5xx error percentages
  • Availability: Service uptime metrics

EventLog Integration

Rich event logging for debugging and monitoring:

// EventLog entries for each API call
2024-01-15T10:00:00Z [INFO] Starting REST API call: getUserProfile
2024-01-15T10:00:00Z [INFO] Request sent to: https://api.example.com/users/12345
2024-01-15T10:00:01Z [INFO] Response received: 200 OK (1.2s)
2024-01-15T10:00:01Z [INFO] Response size: 2.3KB (truncated)

Best Practices

API Design

  1. Idempotency: Design APIs to be safe for retries
  2. Versioning: Use proper API versioning strategies
  3. Pagination: Handle large datasets appropriately
  4. Rate Limiting: Respect API rate limits

Error Handling

  1. Graceful Degradation: Provide fallback responses
  2. Retry Logic: Implement exponential backoff
  3. Circuit Breakers: Prevent cascading failures
  4. Timeout Configuration: Set appropriate timeouts

Security

  1. Token Rotation: Regularly rotate API credentials
  2. Least Privilege: Use minimal required permissions
  3. Request Signing: Verify request authenticity
  4. Audit Logging: Log all API interactions

Performance

  1. Connection Pooling: Reuse HTTP connections
  2. Response Caching: Cache appropriate responses
  3. Compression: Enable request/response compression
  4. Load Balancing: Distribute API load appropriately